Skip to content

Conversation

@2chanhaeng
Copy link
Contributor

Summary

Introduce the SqliteMessageQueue class along with a testing framework for
message queues. Refactor existing tests to utilize the new testMessageQueue
utility for improved test structure and readability.

Related issue

Changes

  • Added SqliteMessageQueue class to @fedify/sqlite package implementing
    the MessageQueue interface using SQLite as the backing store
  • Added testMessageQueue() utility function to @fedify/testing package
    for standardized testing of MessageQueue implementations
  • Added waitFor() and getRandomKey() helper functions to @fedify/testing
  • Refactored message queue tests across multiple packages to use
    testMessageQueue:
    • @fedify/amqp
    • @fedify/denokv
    • @fedify/postgres
    • @fedify/redis
  • Added new test file for SqliteMessageQueue
  • Exported new classes and functions from their respective modules

Benefits

  • Provides a SQLite-based message queue implementation
  • Reduces code duplication across message queue test files by providing
    a reusable test harness
  • Ensures consistent test coverage across all MessageQueue implementations
  • Makes it easier to add new MessageQueue implementations in the future
    with standardized testing

Checklist

  • Did you add a changelog entry to the CHANGES.md?
  • Did you write some relevant docs about this change (if it's a new feature)?
  • Did you write a regression test to reproduce the bug (if it's a bug fix)?
  • Did you write some tests for this change (if it's a new feature)?
  • Did you run mise test on your machine?

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @2chanhaeng, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the message queuing capabilities by introducing a new SQLite-based message queue implementation. Concurrently, it establishes a robust and standardized testing framework for message queues, which has been applied to existing implementations. This dual approach not only expands the available message queue options but also improves the consistency, reliability, and maintainability of message queue tests across the project, making future integrations and testing more efficient.

Highlights

  • New SqliteMessageQueue Implementation: Introduced a new SqliteMessageQueue class within the @fedify/sqlite package, providing a SQLite-backed message queuing solution. This implementation supports enqueueing single and multiple messages, including delayed messages, and listening for incoming messages using a polling mechanism.
  • Standardized Message Queue Testing Framework: Developed a new testMessageQueue() utility function in the @fedify/testing package. This utility provides a standardized and reusable test harness for MessageQueue implementations, ensuring consistent test coverage and reducing boilerplate code.
  • Refactoring of Existing Message Queue Tests: Refactored the message queue tests for @fedify/amqp, @fedify/denokv, @fedify/postgres, and @fedify/redis to utilize the newly created testMessageQueue() utility. This streamlines the test files, improves readability, and ensures uniformity across different message queue implementations.
  • New Testing Helper Functions: Added waitFor() and getRandomKey() helper functions to the @fedify/testing package. These functions assist in asynchronous test assertions and generating unique keys for test resources, respectively.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new SqliteMessageQueue and a standardized testing utility, testMessageQueue, to unify message queue tests across the repository. The refactoring of existing tests to use this new utility is a great step towards reducing code duplication and ensuring consistent test coverage.

My review focuses on the new implementations. I've found a couple of critical issues in the SqliteMessageQueue related to race conditions and error handling that could lead to message loss and listener crashes. I've also identified areas for improvement in the new testMessageQueue utility to make it more robust and align with the MessageQueue interface. Additionally, I've suggested a minor improvement to the SQLite test to ensure proper cleanup of temporary database files.

@codecov
Copy link

codecov bot commented Jan 16, 2026

Codecov Report

❌ Patch coverage is 83.02326% with 73 lines in your changes missing coverage. Please review.
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
packages/sqlite/src/mq.ts 79.47% 70 Missing ⚠️
packages/testing/src/mq-tester.ts 96.38% 3 Missing ⚠️
Files with missing lines Coverage Δ
packages/postgres/src/mq.ts 91.07% <ø> (+0.59%) ⬆️
packages/sqlite/src/mod.ts 100.00% <100.00%> (ø)
packages/testing/src/mod.ts 100.00% <100.00%> (ø)
packages/testing/src/mq-tester.ts 96.38% <96.38%> (ø)
packages/sqlite/src/mq.ts 79.47% <79.47%> (ø)

... and 2 files with indirect coverage changes

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@2chanhaeng 2chanhaeng marked this pull request as ready for review January 17, 2026 22:34
@sij411
Copy link
Contributor

sij411 commented Jan 18, 2026

How about adding [Symbol.dispose]() for consistency with other MessageQueue implementations?

@2chanhaeng
Copy link
Contributor Author

How about adding [Symbol.dispose]() for consistency with other MessageQueue implementations?

That's a good idea, but it seems to go beyond #477. How about you create a separate issue for it?

2chanhaeng added a commit to 2chanhaeng/fedify that referenced this pull request Jan 18, 2026
2chanhaeng added a commit to 2chanhaeng/fedify that referenced this pull request Jan 18, 2026
2chanhaeng added a commit to 2chanhaeng/fedify that referenced this pull request Jan 18, 2026
SqliteMessageQueue.listen

fedify-dev#526 (comment)
@sij411
Copy link
Contributor

sij411 commented Jan 19, 2026

How about adding [Symbol.dispose]() for consistency with other MessageQueue implementations?

That's a good idea, but it seems to go beyond #477. How about you create a separate issue for it?

Well, the cleanup method belongs to sqlite/src/ mq.ts so i don't think that's out of scope if you implement it for only SqliteMessageQueue. It seems quite simple but caring other message queues should be handled in other PR like you said. (Neither amqp has that)

Copy link
Contributor Author

@2chanhaeng 2chanhaeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding [Symbol.dispose]() for consistency with other MessageQueue implementations?

That's a good idea, but it seems to go beyond #477. How about you create a separate issue for it?

Well, the cleanup method belongs to sqlite/src/ mq.ts so i don't think that's out of scope if you implement it for only SqliteMessageQueue. It seems quite simple but caring other message queues should be handled in other PR like you said. (Neither amqp has that)

Oh, OK. I added it at 80e11c6!

2chanhaeng added a commit to 2chanhaeng/fedify that referenced this pull request Jan 20, 2026
2chanhaeng added a commit to 2chanhaeng/fedify that referenced this pull request Jan 20, 2026
2chanhaeng added a commit to 2chanhaeng/fedify that referenced this pull request Jan 20, 2026
SqliteMessageQueue.listen

fedify-dev#526 (comment)
Comment on lines 238 to 245
`DELETE FROM "${this.#tableName}"
WHERE id = (
SELECT id FROM "${this.#tableName}"
WHERE scheduled <= ?
ORDER BY scheduled
LIMIT 1
)
RETURNING id, message`,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While DELETE … RETURNING is atomic, when multiple processes execute the same query simultaneously, there's a potential race condition. Using BEGIN IMMEDIATE ensures proper locking and prevents duplicate message processing.

Also, don't forget you need to handle SQLITE_BUSY error as well!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at 7e13eec and 06301ba!

static readonly #tableNameRegex = /^[A-Za-z_][A-Za-z0-9_]{0,63}$/;
// In-memory event emitter for notifying listeners when messages are enqueued.
// Scoped per table name to allow multiple queues to coexist.
static readonly #notifyChannels = new Map<string, EventTarget>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The static #notifyChannels map can accumulate EventTarget instances indefinitely. Consider implementing cleanup logic when the last listener for a channel is removed.

This is a minor issue for most use cases, but worth noting for long-running applications with many different table names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at a2851bf!

new EnqueueEvent(delayMs),
);
} catch (error) {
this.#db.exec("ROLLBACK");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, errors are caught and re-thrown without logging. Adding error logs would help with debugging:

Suggested change
this.#db.exec("ROLLBACK");
this.#db.exec("ROLLBACK");
logger.error("Failed to enqueue messages: {error}", { error, messages });

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at c1697fc!

Comment on lines 139 to 144
this.#db
.prepare(
`INSERT INTO "${this.#tableName}" (id, message, created, scheduled)
VALUES (?, ?, ?, ?)`,
)
.run(id, encodedMessage, now, scheduled);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even with busy_timeout set, SQLITE_BUSY errors can still occur when the timeout expires. In a multi-process message queue, this is a normal condition that requires retry logic.

Consider also adding retry logic to enqueueMany() and listen() methods!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at 06301ba!

if (this.#initialized) {
return;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without journal_mode=WAL, SQLite uses the default rollback journal mode which severely limits concurrent access. This will cause reader/writer conflicts in multi-process environments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at c128708!

2chanhaeng added a commit to 2chanhaeng/fedify that referenced this pull request Jan 21, 2026
2chanhaeng added a commit to 2chanhaeng/fedify that referenced this pull request Jan 21, 2026
@2chanhaeng
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new SqliteMessageQueue and a standardized testing utility, testMessageQueue, which significantly improves the structure and maintainability of message queue tests by refactoring existing ones. The implementation of SqliteMessageQueue is well-done, showing careful consideration for SQLite's concurrency characteristics.

My review focuses on the new testing framework and the refactored tests. I've identified a critical issue in the new testMessageQueue utility that could cause tests to fail for correctly implemented message queues. Additionally, I've found that some of the refactored tests for existing message queues do not correctly test for concurrent access as intended. I've also noted a minor resource leak in the new test for SqliteMessageQueue. The provided feedback includes code suggestions to address these points.

fedify-dev#526 (review)

 -  Fixed `testMessageQueue` to pass `{ signal }` options object instead of
    `AbortController` directly to `listen()` method
 -  Fixed PostgreSQL test to share same table and channel between `mq1` and
    `mq2` instances by moving key generation outside factory function
 -  Fixed Redis test to share same queue keys between `mq1` and `mq2`
    instances by moving key generation outside factory function
 -  Fixed SQLite test cleanup to dispose both `mq1` and `mq2` instances,
    preventing resource leak, and removed redundant `mq2.drop()` call
@2chanhaeng
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new SqliteMessageQueue implementation and a standardized testMessageQueue utility. The new SQLite message queue is well-designed for single-node deployments, incorporating robust features like transaction management, busy-error retries with exponential backoff, and in-memory event notifications. The testMessageQueue utility provides a comprehensive test harness, significantly reducing code duplication and ensuring consistent test coverage across all message queue implementations. Existing message queue tests have been successfully refactored to leverage this new utility, improving readability and maintainability. The documentation has been updated to reflect the new SQLite message queue, and new mise tasks have been added for per-package operations, enhancing the development workflow.

Comment on lines +139 to +142
if (!SqliteMessageQueue.#tableNameRegex.test(this.#tableName)) {
throw new Error(
`Invalid table name for the message queue: ${this.#tableName}`,
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The regular expression #tableNameRegex allows underscores (_) in table names, but the error message only mentions "letters, digits". It would be clearer to explicitly state that underscores are also allowed.

      throw new Error(
        `Invalid table name for the message queue: ${this.#tableName}. Only letters, digits, and underscores are allowed.`,
      );

Comment on lines +285 to +298
const result = await this.#withTransactionRetries(() => {
return this.#db
.prepare(
`DELETE FROM "${this.#tableName}"
WHERE id = (
SELECT id FROM "${this.#tableName}"
WHERE scheduled <= ?
ORDER BY scheduled
LIMIT 1
)
RETURNING id, message`,
)
.get(now) as { id: string; message: string } | undefined;
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The DELETE ... RETURNING statement is wrapped in #withTransactionRetries. While this ensures atomicity and retries for the delete operation itself, the poll function's while loop continues to fetch and delete messages. If handler(message) fails, the message is re-enqueued (due to throw error at line 319), but the poll loop immediately breaks (line 327). This means that if a message consistently fails, it might block the queue from processing other messages until the next poll interval or enqueue event. Consider if the poll loop should continue attempting to process other messages even after a handler failure, or if a failing message should be moved to a dead-letter queue or have its scheduled time updated to prevent immediate re-processing.

Comment on lines +350 to +361
while (signal == null || !signal.aborted) {
let timeout: ReturnType<typeof setTimeout> | undefined;
await new Promise<unknown>((resolve) => {
signal?.addEventListener("abort", resolve);
timeout = setTimeout(() => {
signal?.removeEventListener("abort", resolve);
resolve(0);
}, this.#pollIntervalMs);
timeouts.add(timeout);
});
if (timeout != null) timeouts.delete(timeout);
await poll();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The setTimeout in the periodic polling loop (lines 354-359) is added to timeouts but never removed if the signal aborts before the timeout fires. This could lead to a memory leak if many listen calls are made and aborted before their respective timeouts complete. The if (timeout != null) timeouts.delete(timeout); at line 360 only handles the case where the timeout does fire.

      await new Promise<unknown>((resolve) => {
        const abortListener = () => {
          if (timeout != null) clearTimeout(timeout);
          resolve();
        };
        signal?.addEventListener("abort", abortListener, { once: true });
        timeout = setTimeout(() => {
          signal?.removeEventListener("abort", abortListener);
          resolve(0);
        }, this.#pollIntervalMs);
        timeouts.add(timeout);
      });
      if (timeout != null) timeouts.delete(timeout);

Comment on lines +426 to +434
this.#db.close();
this.#unregisterInstance();
} catch (error) {
logger.error(
"Failed to close the database connection for table {tableName}: {error}",
{ tableName: this.#tableName, error },
);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The [Symbol.dispose]() method logs an error if this.#db.close() fails, but it doesn't prevent the instance from being unregistered. If close() fails, the database might still be open or in an inconsistent state, and unregistering the instance might lead to further issues or resource leaks. Consider re-throwing the error or at least making the unregistration conditional on successful closing.

Comment on lines +70 to +73
await mq1.enqueue("Hello, world!");
await waitFor(() => messages.length > 0, 15_000);
deepStrictEqual(messages, ["Hello, world!"]);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The deepStrictEqual assertion here compares messages (an array) directly to ["Hello, world!"]. Given that messages are processed by multiple listeners (mq1.listen and mq2.listen), and only one listener should receive each message, the messages array might contain the message once, but the order or which listener received it isn't guaranteed. While the current test might pass due to quick processing, it's safer to assert that the message is included in the collected messages, or that the total count is correct, rather than a strict equality of the array content and order, especially in a multi-listener scenario.

Suggested change
await mq1.enqueue("Hello, world!");
await waitFor(() => messages.length > 0, 15_000);
deepStrictEqual(messages, ["Hello, world!"]);
await waitFor(() => messages.length > 0, 15_000);
ok(messages.includes("Hello, world!"), "'Hello, world!' message should be received");
deepStrictEqual(messages.length, 1, "Only one 'Hello, world!' message should be received");

Comment on lines +80 to +81
deepStrictEqual(messages, ["Hello, world!", "Delayed message"]);
ok(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the previous comment, deepStrictEqual(messages, ["Hello, world!", "Delayed message"]) might be too strict. The order of messages, especially when delays are involved and multiple listeners are active, is not strictly guaranteed to be ["Hello, world!", "Delayed message"]. It's more robust to check for the presence of both messages and the total count.

    await waitFor(() => messages.length > 1, 15_000);
    ok(messages.includes("Hello, world!"), "'Hello, world!' message should be received");
    ok(messages.includes("Delayed message"), "'Delayed message' should be received");
    deepStrictEqual(messages.length, 2, "Two messages should be received");

];
await mq1.enqueueMany(batchMessages);
await waitFor(() => messages.length >= batchMessages.length, 15_000);
deepStrictEqual(new Set(messages), new Set(batchMessages));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The assertion deepStrictEqual(new Set(messages), new Set(batchMessages)) correctly checks for the presence of all messages regardless of order. However, it doesn't explicitly verify that each message was received exactly once, which is a crucial aspect of message queue behavior (each message should be consumed by only one listener). While the testMessageQueue description mentions "Ensures messages are processed by only one listener", this specific assertion doesn't fully cover it. Consider adding a check for the total count of messages received to ensure no duplicates or missing messages.

      deepStrictEqual(new Set(messages), new Set(batchMessages));
      deepStrictEqual(messages.length, batchMessages.length, "All batch messages should be received exactly once");

() => messages.length >= delayedBatchMessages.length,
15_000,
);
deepStrictEqual(new Set(messages), new Set(delayedBatchMessages));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the previous comment, this assertion checks for presence but not necessarily for unique consumption. Adding a length check would strengthen the test for the "processed by only one listener" guarantee.

      deepStrictEqual(new Set(messages), new Set(delayedBatchMessages));
      deepStrictEqual(messages.length, delayedBatchMessages.length, "All delayed batch messages should be received exactly once");

const expectedMessages = new Set(
Array.from({ length: bulkCount }, (_, i) => `message-${i}`),
);
deepStrictEqual(new Set(messages), expectedMessages);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For the bulk enqueue stress test, deepStrictEqual(new Set(messages), expectedMessages) checks for presence. To fully ensure that each of the bulkCount messages was received exactly once and no duplicates or missing messages occurred, an additional check on the length of the messages array would be beneficial.

    deepStrictEqual(new Set(messages), expectedMessages);
    deepStrictEqual(messages.length, bulkCount, "All bulk messages should be received exactly once");

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

SqliteMessageQueue for single-node deployments

3 participants